-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
composable streams of query results #446
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I get it, but the language around queries, query streams, index streams, and merging is a bit murky even for me after skimming the code. Can you give some bullets of how each term relates to one another in the readme?
I'm starting to wonder if all the query helpers could get bundled into a @convex-dev/query-utils
library that you own. It might be a good organization, and more "official-feeling" than helpers. I've already pulled migrations and rate limiting into components, and maybe I should pull validators into validator-utils
or something, then have helpers be for developing APIs before crystalizing them into separate packages... one downside of course is discoverability.
packages/convex-helpers/README.md
Outdated
|
||
With the `stream` helper, you can construct a stream with the same syntax as | ||
you would use `DatabaseReader`. Once you have a stream, you can compose them | ||
to get more streams (still ordered by the same index) with `mergeStreams`, and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does "get more streams" mean here? reading this first, I'm not sure as a developer (yet)
const authorStreams = authors.map(author => | ||
stream(ctx.db, schema).query("messages").withIndex("by_author", q => q.eq("author", author)) | ||
); | ||
const allAuthorsStream = mergeStreams(authorStreams); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what "mergeStreams" means here. each will be concatenated? I'll try to think of other names as I go, but maybe we drop some comments in here that walk through what each step does.
} | ||
// Otherwise, it's an POJO. | ||
const keys = Object.keys(v).sort(); | ||
const pojo: Value[] = keys.map((k) => [k, v[k]!]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const pojo: Value[] = keys.map((k) => [k, v[k]!]); | |
const pojo: (Value | undefined)[] = keys.map((k) => [k, v[k]]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think they're the same thing
@@ -0,0 +1,86 @@ | |||
import { Value } from "convex/values"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI this will make an entrypoint at convex-helpers/server/compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i thought to do that i had to make an export in package.json
(i don't mind making this an entrypoint, but it shouldn't be necessary for now)
} | ||
lt(field: string, value: Value) { | ||
if (!this.canUpperBound(field)) { | ||
throw new Error(`Cannot use lt on field '${field}'`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's an example of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is like if you have an index on ["a", "b"]
and you do q=>q.lt("b", 1)
, or q=>q.lt("a", 1).lt("a", 1)
or q=>q.eq("a", 1).lt("a", 1)
. there are tests :)
} | ||
|
||
/** | ||
* Merge multiple streams, provided in any order, into a single stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document what order the data will be returned in, and add an example like you do for concatStream
/** | ||
* Apply a filter to a stream. | ||
* | ||
* Watch out for sparse filters, as they may read unbounded amounts of data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we pass in a rowReadLimit
or something so folks can bound - like "give me 10 non-deleted users' messages, but cap the search at 1k users. Would that end up with surprising results later? where pagination might think it's done? Maybe throw a catch-able error with the results so far? Or do you have other ideas for how to gracefully fail there? Maybe it's "usually" totally fine, but there was a burst of user deletions, and now a query is stuck in a failing state?
tbh I hit some review fatigue when I got to the huge stream.ts and compare.ts logic. I can look more closely at whatever feels most important to you, or go deep into it once I have self-hosting things under control |
Co-authored-by: Ian Macartney <ian@convex.dev>
Co-authored-by: Ian Macartney <ian@convex.dev>
Co-authored-by: Ian Macartney <ian@convex.dev>
Co-authored-by: Ian Macartney <ian@convex.dev>
…convex-helpers into lee/paginator-stream
DataModel extends GenericDataModel, | ||
T extends TableNamesInDataModel<DataModel>, | ||
> { | ||
iterWithKeys(): AsyncIterable<[DocumentByName<DataModel, T> | null, IndexKey]>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: IndexKey can contain undefined, make sure we're not serializing it as a convex value
- e.g. `stream(ctx.db, schema).query("messages").withIndex("by_author", (q) => q.eq("author", "user1"))` | ||
- `mergeStreams` combines multiple streams into a new stream, ordered by the same index. | ||
- `filterStream` filters out documents from a stream based on a TypeScript predicate. | ||
- `queryStream` converts a stream into a query, so you can call `.first()`, `.collect()`, `.paginate()`, etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
necessary?
## Composable streams of query results | ||
|
||
These are helper functions for constructing and composing streams of query results. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a motivating example here
stream(ctx.db, schema) | ||
.query("messages") | ||
.withIndex("by_author", (q) => | ||
q.eq("author", author).eq("unread", unread), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this order by author,unread still? or by _creationTime?
); | ||
// Merge the two streams into a single stream of all messages authored by | ||
// `args.author`, ordered by _creationTime descending. | ||
const allMessagesByCreationTime = mergeStreams(...messagesForUnreadStatus); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate cursor for each mergeStream?
as described in the README, add some helpers for merging and filtering streams of query results.
This extracts the implementations of
paginator
andstreamQuery
into new helpers, that allow better composition and more patterns.First there's
reflect
which allows you to construct queries with the normal syntaxreflect(ctx.db, schema).query(table).withIndex(index, indexRange).order("desc")
and then get their internal details, e.g. which index it's looking at. -- the naming is based on https://golangbot.com/reflection/ but it's mostly an internal library.Then once you have a reflectable query, it can be used as a "stream", which is an async iterable of documents ordered by an index. For this reason,
stream(ctx.db, schema)
is another way of writingreflect(ctx.db, schema)
.Once you have a stream, you can merge them with
mergeStreams
or filter them withfilterStream
, generating more streams.See the README for more details and motivating examples.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.